package com.my.service.task.kmeans;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.List;

public class KMeansFinalMap implements MapFunction<String, Point>{

    private final List<Point> centers;
    private final DistanceCompute disC = new DistanceCompute();

    public KMeansFinalMap(List<Point> centers){
            this.centers = centers;
    }
    // 输入一个特征向量，根据原先已经算好的centers中心点，计算出这个特征向量的类别归属
    @Override
    public Point map(String s){
        if(StringUtils.isBlank(s)){
            return null;
        }
        String [] temps = s.split(",");
        String variable1 = temps[0];
        String variable2 = temps[1];
        String variable3 = temps[2];
        Point pointForS = new Point(1,new float[]{Float.parseFloat(variable1),Float.parseFloat(variable2),Float.parseFloat(variable3)});
        float min_dis = Integer.MAX_VALUE;
        for (Point point : centers) {
            float tmp_dis = (float) Math.min(disC.getEuclideanDis(pointForS, point), min_dis);
            if (tmp_dis != min_dis) {
                min_dis = tmp_dis;
                pointForS.setClusterId(point.getId());
                pointForS.setDist(min_dis);
                pointForS.setClusterPoint(point);
            }
        }
        return pointForS;
    }
}
